跳到主要内容

写一个最基本的 babyagi

import chromadb
from chromadb.utils.embedding_functions import ONNXMiniLM_L6_V2
import os
import openai
import logging
import re
from collections import deque
from typing import Dict, List
import importlib
import time

# Task storage supporting only a single instance of BabyAGI
# Extensions support begin

OBJECTIVE = "一个团购功能需要考虑的点"
JOIN_EXISTING_OBJECTIVE = False


def can_import(module_name):
try:
importlib.import_module(module_name)
return True
except ImportError:
return False

# Results storage using local ChromaDB


class DefaultResultsStorage:
def __init__(self, results_store_name):
logging.getLogger('chromadb').setLevel(logging.ERROR)
# Create Chroma collection
chroma_persist_dir = "chroma"
chroma_client = chromadb.PersistentClient(
settings=chromadb.config.Settings(
persist_directory=chroma_persist_dir,
)
)

metric = "cosine"
embedding_function = ONNXMiniLM_L6_V2()
self.collection = chroma_client.get_or_create_collection(
name=results_store_name,
metadata={"hnsw:space": metric},
embedding_function=embedding_function,
)

def add(self, task: Dict, result: str, result_id: str):
# Continue with the rest of the function

embeddings = None
if (
len(self.collection.get(
ids=[result_id], include=[])["ids"]) > 0
): # Check if the result already exists
self.collection.update(
ids=result_id,
embeddings=embeddings,
documents=result,
metadatas={"task": task["task_name"], "result": result},
)
else:
self.collection.add(
ids=result_id,
embeddings=embeddings,
documents=result,
metadatas={"task": task["task_name"], "result": result},
)

def query(self, query: str, top_results_num: int) -> List[dict]:
count: int = self.collection.count()
if count == 0:
return []
results = self.collection.query(
query_texts=query,
n_results=min(top_results_num, count),
include=["metadatas"]
)
return [item["task"] for item in results["metadatas"][0]]


class SingleTaskListStorage:
def __init__(self):
self.tasks = deque([])
self.task_id_counter = 0

def append(self, task: Dict):
self.tasks.append(task)

def replace(self, tasks: List[Dict]):
self.tasks = deque(tasks)

def popleft(self):
return self.tasks.popleft()

def is_empty(self):
return False if self.tasks else True

def next_task_id(self):
self.task_id_counter += 1
return self.task_id_counter

def get_task_names(self):
return [t["task_name"] for t in self.tasks]


def use_chroma():
print("\nUsing results storage: " +
"\033[93m\033[1m" + "Chroma (Default)" + "\033[0m\033[0m")
return DefaultResultsStorage('babyagi_results_table')


# Initialize tasks storage
tasks_storage = SingleTaskListStorage()
results_storage = use_chroma()


def openai_call(
prompt: str,
max_tokens: int = 100,
):
while True:
try:
# Use completion API
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
max_tokens=max_tokens,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
)
return response.choices[0].message.content
except openai.error.RateLimitError:
print(
" *** The OpenAI API rate limit has been exceeded. Waiting 10 seconds and trying again. ***"
)
time.sleep(10) # Wait 10 seconds and try again
except openai.error.Timeout:
print(
" *** OpenAI API timeout occurred. Waiting 10 seconds and trying again. ***"
)
time.sleep(10) # Wait 10 seconds and try again
except openai.error.APIError:
print(
" *** OpenAI API error occurred. Waiting 10 seconds and trying again. ***"
)
time.sleep(10) # Wait 10 seconds and try again
except openai.error.APIConnectionError:
print(
" *** OpenAI API connection error occurred. Check your network settings, proxy configuration, SSL certificates, or firewall rules. Waiting 10 seconds and trying again. ***"
)
time.sleep(10) # Wait 10 seconds and try again
except openai.error.InvalidRequestError:
print(
" *** OpenAI API invalid request. Check the documentation for the specific API method you are calling and make sure you are sending valid and complete parameters. Waiting 10 seconds and trying again. ***"
)
time.sleep(10) # Wait 10 seconds and try again
except openai.error.ServiceUnavailableError:
print(
" *** OpenAI API service unavailable. Waiting 10 seconds and trying again. ***"
)
time.sleep(10) # Wait 10 seconds and try again
else:
break


def task_creation_agent(
objective: str, result: Dict, task_description: str, task_list: List[str]
):
prompt = f"""
You are to use the result from an execution agent to create new tasks with the following objective: {objective}.
The last completed task has the result: \n{result["data"]}
This result was based on this task description: {task_description}.\n"""

if task_list:
prompt += f"These are incomplete tasks: {', '.join(task_list)}\n"
prompt += "Based on the result, return a list of tasks to be completed in order to meet the objective. "
if task_list:
prompt += "These new tasks must not overlap with incomplete tasks. "

prompt += """
Return one task per line in your response. The result must be a numbered list in the format:

#. First task
#. Second task

The number of each entry must be followed by a period. If your list is empty, write "There are no tasks to add at this time."
Unless your list is empty, do not include any headers before your numbered list or follow your numbered list with any other output."""

print(f'\n*****TASK CREATION AGENT PROMPT****\n{prompt}\n')
response = openai_call(prompt, max_tokens=2000)
print(f'\n****TASK CREATION AGENT RESPONSE****\n{response}\n')
new_tasks = response.split('\n')
new_tasks_list = []
for task_string in new_tasks:
task_parts = task_string.strip().split(".", 1)
if len(task_parts) == 2:
task_id = ''.join(s for s in task_parts[0] if s.isnumeric())
task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()
if task_name.strip() and task_id.isnumeric():
new_tasks_list.append(task_name)
# print('New task created: ' + task_name)

out = [{"task_name": task_name} for task_name in new_tasks_list]
return out


def prioritization_agent():
task_names = tasks_storage.get_task_names()
bullet_string = '\n'

prompt = f"""
You are tasked with prioritizing the following tasks: {bullet_string + bullet_string.join(task_names)}
Consider the ultimate objective of your team: {OBJECTIVE}.
Tasks should be sorted from highest to lowest priority, where higher-priority tasks are those that act as pre-requisites or are more essential for meeting the objective.
Do not remove any tasks. Return the ranked tasks as a numbered list in the format:

#. First task
#. Second task

The entries must be consecutively numbered, starting with 1. The number of each entry must be followed by a period.
Do not include any headers before your ranked list or follow your list with any other output."""

print(f'\n****TASK PRIORITIZATION AGENT PROMPT****\n{prompt}\n')
response = openai_call(prompt, max_tokens=2000)
print(f'\n****TASK PRIORITIZATION AGENT RESPONSE****\n{response}\n')
if not response:
print('Received empty response from priotritization agent. Keeping task list unchanged.')
return
new_tasks = response.split("\n") if "\n" in response else [response]
new_tasks_list = []
for task_string in new_tasks:
task_parts = task_string.strip().split(".", 1)
if len(task_parts) == 2:
task_id = ''.join(s for s in task_parts[0] if s.isnumeric())
task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()
if task_name.strip():
new_tasks_list.append(
{"task_id": task_id, "task_name": task_name})

return new_tasks_list


# Execute a task based on the objective and five previous tasks
def execution_agent(objective: str, task: str) -> str:
"""
Executes a task based on the given objective and previous context.

Args:
objective (str): The objective or goal for the AI to perform the task.
task (str): The task to be executed by the AI.

Returns:
str: The response generated by the AI for the given task.

"""

context = context_agent(query=objective, top_results_num=5)
# print("\n****RELEVANT CONTEXT****\n")
# print(context)
# print('')
prompt = f'Perform one task based on the following objective: {objective}.\n'
if context:
prompt += 'Take into account these previously completed tasks:' + \
'\n'.join(context)
prompt += f'\nYour task: {task}\nResponse:'
return openai_call(prompt, max_tokens=2000)


# Get the top n completed tasks for the objective
def context_agent(query: str, top_results_num: int):
"""
Retrieves context for a given query from an index of tasks.

Args:
query (str): The query or objective for retrieving context.
top_results_num (int): The number of top results to retrieve.

Returns:
list: A list of tasks as context for the given query, sorted by relevance.

"""
results = results_storage.query(
query=query, top_results_num=top_results_num)
# print("****RESULTS****")
# print(results)
return results


# Add the initial task if starting new objective
if not JOIN_EXISTING_OBJECTIVE:
initial_task = {
"task_id": tasks_storage.next_task_id(),
"task_name": 'Develop a task list'
}
tasks_storage.append(initial_task)


def main():
loop = True
while loop:
# As long as there are tasks in the storage...
if not tasks_storage.is_empty():
# Print the task list
print("\033[95m\033[1m" +
"\n*****TASK LIST*****\n" + "\033[0m\033[0m")
for t in tasks_storage.get_task_names():
print(" • " + str(t))

# Step 1: Pull the first incomplete task
task = tasks_storage.popleft()
print("\033[92m\033[1m" +
"\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
print(str(task["task_name"]))

# Send to execution function to complete the task based on the context
result = execution_agent(OBJECTIVE, str(task["task_name"]))
print("\033[93m\033[1m" +
"\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
print(result)

# Step 2: Enrich result and store in the results storage
# This is where you should enrich the result if needed
enriched_result = {
"data": result
}
# extract the actual result from the dictionary
# since we don't do enrichment currently
# vector = enriched_result["data"]

result_id = f"result_{task['task_id']}"

results_storage.add(task, result, result_id)

# Step 3: Create new tasks and re-prioritize task list
# only the main instance in cooperative mode does that
new_tasks = task_creation_agent(
OBJECTIVE,
enriched_result,
task["task_name"],
tasks_storage.get_task_names(),
)

print('Adding new tasks to task_storage')
for new_task in new_tasks:
new_task.update({"task_id": tasks_storage.next_task_id()})
print(str(new_task))
tasks_storage.append(new_task)

if not JOIN_EXISTING_OBJECTIVE:
prioritized_tasks = prioritization_agent()
if prioritized_tasks:
tasks_storage.replace(prioritized_tasks)

# Sleep a bit before checking the task list again
time.sleep(5)
else:
print('Done.')
loop = False


if __name__ == "__main__":
main()